package cn.zhaopin.starter.mq.support;

import cn.zhaopin.starter.mq.common.PulsarMessage;
import cn.zhaopin.starter.mq.common.PulsarMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;

import java.nio.charset.Charset;
import java.util.Objects;

/**
 * Description: 消息转换工具类
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/31-15:10
 */
@SuppressWarnings("rawtypes")
public class PulsarMessageUtils {

    private static final String CHARSET = "UTF-8";

    public static PulsarMessage convertToPulsarMessage(MessageConverter messageConverter, Message message) {
        Object payloadObject = message.getPayload();
        MessageHeaders headers = message.getHeaders();

        Object payload;
        try {
            if (Objects.isNull(payloadObject)) {
                throw new RuntimeException("the message cannot be empty");
            }
            if (payloadObject instanceof String) {
                payload = payloadObject.toString();
            } else if (payloadObject instanceof byte[]) {
                payload = new String((byte[]) message.getPayload(), Charset.forName(CHARSET));
            } else {
                Object jsonObj = messageConverter.fromMessage(message, payloadObject.getClass());
                if (null == jsonObj) {
                    throw new RuntimeException(String.format(
                            "empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
                            messageConverter.getClass(), payloadObject.getClass(), payloadObject));
                }
                payload = jsonObj;
            }
        } catch (Exception e) {
            throw new RuntimeException("convert to Pulsar message failed.", e);
        }
        return PulsarMessageBuilder.withPayload(payload).copyHeaders(headers).build();
    }

}
